package local

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"gitee.com/gitee-go/core"
	"gitee.com/gitee-go/core/common"
	"gitee.com/gitee-go/core/model/pipeline"
	"gitee.com/gitee-go/core/models"
	"gitee.com/gitee-go/core/runtime"
	"gitee.com/gitee-go/dag-core/dag"
	"gitee.com/gitee-go/server/comm"
	"gitee.com/gitee-go/server/comm/notice"
	"gitee.com/gitee-go/server/comm/plugin"
	"gitee.com/gitee-go/server/comm/repo"
	ymldag "gitee.com/gitee-go/server/comm/yml/dag"
	"gitee.com/gitee-go/server/comm/yml/vars"
	"gitee.com/gitee-go/server/service"
	"gitee.com/gitee-go/utils"
	"gitee.com/gitee-go/utils/gitex"
	"gitee.com/gitee-go/utils/ioex"
	"github.com/go-git/go-git/v5"
	"github.com/go-git/go-git/v5/plumbing"
	ghttp "github.com/go-git/go-git/v5/plumbing/transport/http"
	"gopkg.in/yaml.v2"
	"io/ioutil"
	"os"
	"path/filepath"
	"runtime/debug"
	"strconv"
	"strings"
	"sync"
	"time"
)

var pipelinelk sync.Mutex

/**
触发任务具体执行(webhook,手动触发)
*/
type taskPreBuild struct {
	ctx      context.Context
	prt      *PreBuildEngine
	preBuild *runtime.PreBuild
	cancel   context.CancelFunc

	id  string
	end bool
}

func (c *taskPreBuild) start() {
	c.end = false
	if !c.check() {
		c.end = true
		c.cancel()
		return
	}
	// 如果构建信息存在,就更新构建信息
	if c.preBuild.Info.PipelineVersionId != "" {
		err := updatePipelineVersion(c.preBuild.Info.PipelineVersionId, common.PIPELINE_VERSION_STATUS_RUNNING)
		if err != nil {
			core.Log.Errorf("start updatePipelineVersion err :%v", err)
		}
	}
	go c.prepareBuild(c.preBuild)
}

func (c *taskPreBuild) check() bool {
	if c.preBuild == nil {
		core.Log.Errorf("taskPreBuild start err : preBuild is empty")
		return false
	}
	return true
}

// preBuild任务逻辑
func (c *taskPreBuild) prepareBuild(pb *runtime.PreBuild) {
	defer func() {
		c.end = true
		c.cancel()
		if err := recover(); err != nil {
			core.LogPnc.Errorf("taskPreBuild start: %v", err)
			core.LogPnc.Errorf("%s", string(debug.Stack()))
		}
	}()
	// 构建临时工作目录,提供仓库代码拉取
	workPath := filepath.Join(comm.WorkPath, comm.Repo, utils.NewXid(), utils.RandomString(6))
	defer func() {
		err := c.removeRepo(workPath)
		if err != nil {
			core.Log.Errorf("PreBuildEngine RemoveRepo  err: %v", err)
		}
	}()
	//getrepo
	if ioex.CheckContext(c.ctx) {
		return
	}
	// 拉取仓库代码,clone
	_, err := c.getRepo(pb, workPath)
	if err != nil {
		return
	}
	triggerType := pb.TriggerType
	if pb.TriggerType == common.TRIGGER_TYPE_BUILD {
		db := comm.DBMain.GetDB()
		tp := &pipeline.TPipeline{}
		get, err := db.Where("id = ?", pb.Info.PipelineId).Get(tp)
		if !get {
			core.Log.Error("未找到对应流水线")
			return
		}
		if err != nil {
			core.Log.Errorf("PreBuildEngine getYmlsByManual db err: %v", err)
			return
		}
		triggerType = tp.PipelineType
	}
	if triggerType == common.TRIGGER_TYPE_WEBHOOK {
		c.webhookToBuild(pb, workPath) // 把解析好的webhook储存好,并且推送给buildEngine开始调度
	} else {
		err = c.manualToBuild(pb, workPath)
		if err != nil {
			core.Log.Errorf("manualToBuild db: %v", err)
		}
	}

}

// 解析webhook
func (c *taskPreBuild) webhookToBuild(pb *runtime.PreBuild, workPath string) {
	if ioex.CheckContext(c.ctx) {
		return
	}
	ymls, err := c.getYmlsByHook(pb, workPath)
	if err != nil {
		core.Log.Errorf("webhookToBuild  err: %v", err)
		return
	}
	// 循环仓库代码内所有的构建yaml文件
	for i := 0; !ioex.CheckContext(c.ctx) && i < len(ymls); i++ {
		yl := ymls[i]
		if SkipTriggerRules(pb, yl) {
			core.Log.Debugf("webhookToBuild SkipTriggerRules pipelineName :%v ", yl.Pie.Name)
			continue
		}
		err = c.checkAndGetPlugin(pb, yl)
		if err != nil {
			core.Log.Errorf("webhookToBuild isMatched pipelineName :%v  err:%v", yl.Pie.Name, err)
			continue
		}
		tvp, errs := c.saveOrUpdatePipeline(pb, yl)
		if errs != nil {
			core.Log.Errorf("webhookToBuild saveOrUpdatePipeline pipelineName :%v  err:%v", yl.Pie.Name, err)
			continue
		}
		tb, errs := c.saveBuild(pb, yl, tvp)
		if errs != nil {
			core.Log.Errorf("webhookToBuild saveOrUpdatePipeline pipelineName :%v  err:%v", yl.Pie.Name, err)
			continue
		}
		yl.Pie.Id = tvp.Id
		errs = c.putRuntimeBuild(pb, yl, tb, tvp, workPath)
		if errs != nil {
			core.Log.Errorf("webhookToBuild PutRuntimeBuild pipelineName :%v  err:%v", yl.Pie.Name, err)
			continue
		}
		errs = updatePipelineVersion(tvp.Id, common.PIPELINE_VERSION_STATUS_OK)
		if errs != nil {
			core.Log.Errorf("webhookToBuild updatePipelineVersion  :%v  err:%v", yl.Pie.Name, err)
		}
	}
}

/**
保存TBuild
*/
func (c *taskPreBuild) saveBuild(pb *runtime.PreBuild, yl *models.YML, tv *pipeline.TPipelineVersion) (tBuild *pipeline.TBuild, rterr error) {
	defer func() {
		if err := recover(); err != nil {
			core.LogPnc.Errorf("saveBuild:%+v", err)
			core.LogPnc.Errorf("%s", string(debug.Stack()))
		}
	}()
	defer func() {
		if rterr != nil {
			err := updatePipelineVersion(tv.Id, common.PIPELINE_VERSION_STATUS_ERROR, rterr.Error())
			if err != nil {
				core.Log.Errorf("saveOrUpdateBuild updatePipelineVersion  err: %v", err)
			}
			err = sendPipelineFailedNotice(pb, yl, rterr)
			if err != nil {
				core.Log.Errorf("saveOrUpdateBuild sendPipelineFailedNotice  err: %v", err)
			}
		}
	}()

	core.Log.Debugf("saveOrUpdateBuild pipelineName :%v ", yl.Pie.Name)
	db := comm.DBMain.GetDB()
	p := yl.Pie
	tb := &pipeline.TBuild{}
	tb.Id = utils.NewXid()
	tb.PipelineId = tv.PipelineId
	tb.PipelineVersionId = tv.Id
	tb.Status = common.BUILD_STATUS_PENDING
	tb.Created = time.Now()
	_, err := db.Insert(tb)
	if err != nil {
		return nil, err
	}

	stages := p.Stages
	if len(stages) == 0 {
		return nil, errors.New("stages is empty")
	}
	for i := 0; i < len(stages); i++ {
		stage := stages[i]
		//数据库结构
		ts := &pipeline.TStage{}
		ts.Id = utils.NewXid()
		ts.PipelineVersionId = tv.Id
		ts.BuildId = tb.Id
		ts.Name = stage.Name
		ts.DisplayName = stage.DisplayName
		ts.Created = time.Now()
		ts.Version = stage.Version
		ts.Sort = int64(i + 1)
		ts.Stage = stage.Stage
		ts.Status = common.BUILD_STATUS_PENDING
		_, err = db.Insert(ts)
		if err != nil {
			return nil, err
		}
		//为转换成runtime结构做准备
		stage.Id = ts.Id
		stage.PipelineVersionId = ts.PipelineVersionId
		stage.BuildId = ts.BuildId
		stage.Created = ts.Created
		stage.Status = ts.Status

		jobs := stage.Jobs
		if len(jobs) == 0 {
			core.Log.Warning("the jobs is empty")
			continue
		}
		for j := 0; j < len(jobs); j++ {
			job := jobs[j]
			if job.Name == "" {
				return nil, errors.New("job name is empty")
			}
			tj := &pipeline.TJob{}
			djs, err := json.Marshal(job.DependsOn)
			if err != nil {
				core.Log.Errorf("Marshal err : %v", err)
				return nil, err
			}
			tj.DependsOn = string(djs)
			de, err := json.Marshal(job.Environments)
			if err != nil {
				core.Log.Errorf("Marshal err : %v", err)
				return nil, err
			}
			tj.Environments = string(de)
			tj.Id = utils.NewXid()
			tj.BuildId = tb.Id
			tj.StageId = ts.Id
			tj.PipelineVersionId = tv.Id
			tj.DisplayName = job.DisplayName
			tj.Job = job.Job
			tj.Name = job.Name
			tj.Created = time.Now()
			tj.Version = stage.Version
			tj.Sort = int64(j + 1)
			tj.Status = common.BUILD_STATUS_PENDING
			tj.Commands = job.Command
			_, err = db.Insert(tj)
			if err != nil {
				return nil, err
			}
			job.Id = tj.Id
			job.BuildId = tj.BuildId
			job.StageId = tj.StageId
			job.Created = tj.Created
			job.Status = tj.Status
			artifacts := job.Artifacts
			for k := 0; k < len(artifacts); k++ {
				artifact := artifacts[k]
				tArtifact := &pipeline.TArtifact{
					Id:         utils.NewXid(),
					JobId:      tj.Id,
					BuildId:    tb.Id,
					StageId:    ts.Id,
					BuildName:  "",
					StageName:  ts.Name,
					JobName:    tj.Name,
					Created:    time.Now(),
					Name:       artifact.Name,
					Scope:      artifact.Scope,
					Path:       artifact.Path,
					Repository: artifact.Repository,
					Value:      artifact.Value,
				}
				artifact.Id = tArtifact.Id
				artifact.BuildId = tArtifact.BuildId
				artifact.StageId = tArtifact.StageId
				artifact.JobId = tArtifact.JobId
				artifact.BuildName = tArtifact.BuildName
				artifact.StageName = tArtifact.StageName
				artifact.JobName = tArtifact.JobName
				_, err = db.Insert(tArtifact)
				if err != nil {
					core.Log.Errorf("to Build insert dependArtifact err:%v", err)
				}
			}

			dependArtifacts := job.DependArtifacts
			for o := 0; o < len(dependArtifacts); o++ {
				dependArtifact := dependArtifacts[o]
				td := &pipeline.TDependArtifact{
					Id:          utils.NewXid(),
					JobId:       tj.Id,
					BuildId:     tb.Id,
					StageId:     ts.Id,
					BuildName:   "",
					StageName:   ts.Name,
					JobName:     tj.Name,
					Created:     time.Now(),
					Name:        dependArtifact.Name,
					Type:        dependArtifact.Type,
					Repository:  dependArtifact.Repository,
					Target:      dependArtifact.Target,
					SourceStage: dependArtifact.SourceStage,
					SourceJob:   dependArtifact.SourceJob,
					Value:       dependArtifact.Value,
				}
				if !dependArtifact.IsForce {
					td.IsForce = 1
				}

				dependArtifact.BuildId = td.BuildId
				dependArtifact.StageId = td.StageId
				dependArtifact.JobId = td.JobId
				dependArtifact.BuildName = td.BuildName
				dependArtifact.StageName = td.StageName
				dependArtifact.JobName = td.JobName

				_, err = db.Insert(td)
				if err != nil {
					core.Log.Errorf("to Build insert dependArtifact err:%v", err)
				}
			}
		}
	}
	return tb, nil
}
func (c *taskPreBuild) putRuntimeBuild(pb *runtime.PreBuild, yl *models.YML, tb *pipeline.TBuild, tvp *pipeline.TPipelineVersion, workPath string) (rterr error) {
	core.Log.Debugf("PutRuntimeBuild pipeliName :%v ", yl.Pie.Name)
	defer func() {
		if rterr != nil {
			if tvp != nil && tvp.Id != "" {
				err := updatePipelineVersion(tvp.Id, common.PIPELINE_VERSION_STATUS_ERROR, rterr.Error())
				if err != nil {
					core.Log.Errorf("saveOrUpdatePipelineVersionAndBuild updatePipelineVersion err: %v", err)
				}
			}
			err := sendPipelineFailedNotice(pb, yl, rterr)
			if err != nil {
				core.Log.Errorf("saveOrUpdatePipelineVersionAndBuild  sendPipelineFailedNotice err: %v", err)
			}
		}
	}()
	//to runtime
	toRuntime, err := yl.BuildToRuntime(tb)
	if err != nil {
		core.Log.Errorf("PreBuildEngine validationTriggerRules  err: %v", err)
		err = fmt.Errorf("转换为runtime错误")
		return err
	}
	toRuntime.Vars = yl.Pie.Vars
	//to zip
	_, err = c.repoToZip(workPath, tb.Id)
	if err != nil {
		core.Log.Errorf("PreBuildEngine validationTriggerRules  err: %v", err)
	}
	//put engine
	err = c.prt.bdegn.Put(toRuntime)
	if err != nil {
		core.Log.Errorf("PreBuildEngine Put  err: %v", err)
		err = fmt.Errorf("put engine err")
		return err
	}
	return nil
}
func (c *taskPreBuild) saveOrUpdatePipeline(pb *runtime.PreBuild, yl *models.YML) (tvp *pipeline.TPipelineVersion, rterr error) {
	core.Log.Debugf("saveOrUpdatePipeline repoName :%v ", pb.GetRepository().Name)
	defer func() {
		if rterr != nil {
			if tvp != nil && tvp.Id != "" {
				err := updatePipelineVersion(tvp.Id, common.PIPELINE_VERSION_STATUS_ERROR, rterr.Error())
				if err != nil {
					core.Log.Errorf("saveOrUpdatePipeline updatePipelineVersion err: %v", err)
				}
			}
			err := sendPipelineFailedNotice(pb, yl, rterr)
			if err != nil {
				core.Log.Errorf("saveOrUpdatePipeline  sendPipelineFailedNotice err: %v", err)
			}
		}
	}()
	//insert or update pipeline
	tp, err := saveOrUpdatePipeline(pb, yl.Pie)
	if err != nil {
		return nil, err
	}
	return c.saveOrUpdatePipelineVersion(pb, yl, tp)
}
func (c *taskPreBuild) checkAndGetPlugin(pb *runtime.PreBuild, yl *models.YML) (rterr error) {
	core.Log.Debugf("isMatched repoName :%v ", pb.GetRepository().Name)
	defer func() {
		// 执行完成后如果错误,更新数据库,发送错误信息站内信
		if rterr != nil {
			if pb.Info.PipelineVersionId != "" {
				err := updatePipelineVersion(pb.Info.PipelineVersionId, common.PIPELINE_VERSION_STATUS_ERROR, rterr.Error())
				if err != nil {
					core.Log.Errorf("isMatched updatePipelineVersion err:%v", err)
				}
			}
			err := sendYmlFailedNotice(pb, rterr)
			if err != nil {
				core.Log.Errorf("isMatched sendYmlFailedNotice err:%v", err)
			}
		}
	}()
	errs := yl.Check()
	if errs != nil {
		return errs
	}
	errs = c.getPlugins(yl, yl.YmlContent)
	if errs != nil {
		return errs
	}
	return nil
}

// 获取插件
func (c *taskPreBuild) getPlugins(y *models.YML, content string) error {
	core.Log.Debugf("getPlugins pipelineName :%v ", y.Pie.Name)
	pluginPipeline := &models.PluginPipeline{}
	err := yaml.Unmarshal([]byte(content), pluginPipeline)
	if err != nil {
		return errors.New("解析插件YML失败")
	}
	// 临时变量
	pluginStages := pluginPipeline.Stages
	stages := y.Pie.Stages
	//循环所有stage的job,查找其需要的插件,如果没有就报错,如果有就解析插件,并拼接命令
	for i, stage := range stages {
		pluginStage := pluginStages[i]
		jobs := stage.Jobs
		if jobs == nil || len(jobs) <= 0 {
			continue
		}
		for j, job := range jobs {
			pluginJob := pluginStage.Jobs[j]
			pn := strings.TrimSpace(job.Job)
			pl, err := plugin.GetDBPlugin(pn)
			if err != nil {
				return errors.New("获取插件失败")
			}
			if pl != nil {
				job.Plugined = true
				parameter, varm, err := plugin.ParsingPlugin(job, pluginJob, pl)
				if err != nil {
					core.Log.Errorf("ParsingPlugin err:%v", err)
					return fmt.Errorf("解析插件%s 失败%v", pn, err)
				}
				job.Job = pl.Type
				job.Commands = parameter
				job.PlugVars = varm
			}
			if job.Commands != nil {
				command, err := vars.CommandsToString(job.Commands)
				if err != nil {
					core.Log.Errorf("CommandsToJson err : %v", err)
					return err
				}
				job.Command = command
			}
			if !c.prt.cliegn.ExistPlugRunner(job.Job) {
				return fmt.Errorf("not found plugin %s runner", job.Job)
			}
		}
	}
	return nil
}

// 获取代码里的流水线yaml
func (c *taskPreBuild) getYmlsByHook(pb *runtime.PreBuild, workPath string) (yls []*models.YML, rterr error) {
	core.Log.Debugf("getYmlsByHook repoName :%v ", pb.GetRepository().Name)
	dirs, err := ioutil.ReadDir(filepath.Join(workPath, comm.YamlDir))
	if err != nil {
		return nil, err
	}
	if len(dirs) < 1 {
		return nil, errors.New("not found any files")
	}
	ymls := make([]*models.YML, 0)
	maps := make(map[string]int)
	for _, f := range dirs {
		name := f.Name()
		if strings.Contains(name, ".yaml") || strings.Contains(name, ".yml") && !f.IsDir() {
			core.Log.Infof("findYml file name is :%v ", f.Name())
			if f.Size() < 1 {
				continue
			}
			y, err := getYmlByFile(pb, f, workPath)
			if err != nil {
				core.Log.Errorf("PreBuildEngine getYmlByFile %v err: %v", f.Name(), err)
				continue
			}
			if i, ok := maps[y.Pie.Name]; !ok {
				maps[y.Pie.Name] = 0
			} else {
				maps[y.Pie.Name] = i + 1
			}
			ymls = append(ymls, y)
		}
	}

	ys := make([]*models.YML, 0)
	for _, y := range ymls {
		if i, ok := maps[y.Pie.Name]; ok && i > 0 {
			continue
		}
		ys = append(ys, y)
	}
	return ys, nil
}

// 从文件获取流水线
func getYmlByFile(pb *runtime.PreBuild, f os.FileInfo, workPath string) (yl *models.YML, rterr error) {
	ts := time.Now()
	defer func() {
		core.Log.Debugf("getYmlByFile end time:%.4fs", time.Since(ts).Seconds())
	}()
	defer func() {
		if rterr != nil {
			err := sendYmlFailedNotice(pb, rterr)
			if err != nil {
				core.Log.Errorf("getYmlByFile sendYmlFailedNotice err:%v", err)
			}
		}
	}()
	core.Log.Debugf("getYmlByFile repoName :%v ", pb.GetRepository().Name)
	fl, err := ioutil.ReadFile(filepath.Join(workPath, comm.YamlDir, f.Name()))
	if err != nil {
		core.Log.Errorf("PreBuildEngine ReadFile %v err: %v", f.Name(), err)
		return nil, err
	}
	if len(fl) < 1 {
		core.Log.Errorf("%v文件内容为空", f.Name())
		return nil, fmt.Errorf("%v文件内容为空", f.Name())
	}
	y := &models.YML{}
	p := &models.Pipeline{}
	err = yaml.Unmarshal(fl, p)
	if err != nil {
		core.Log.Errorf("getYmlByFile yaml.Unmarshal err :%v", err)
		return nil, fmt.Errorf("%v 文件解析失败", f.Name())
	}
	y.Pie = p
	if p.Name == "" {
		return nil, errors.New("yml Name 字段为空")
	}
	marshal, err := y.Marshal()
	if err != nil {
		core.Log.Errorf("getYmlByFile y.Marshal err :%v", err)
		return nil, fmt.Errorf("%v 文件解析失败", f.Name())
	}
	dagT, errs := ymldag.GetDAG()
	if errs != nil {
		core.Log.Errorf("getYmlByFile GetDAG err : %v", errs)
		return nil, fmt.Errorf("%v 文件获取dag配置失败", f.Name())
	}
	errs = dag.CheckJsonWithDag(marshal, []byte(dagT))
	if errs != nil {
		return nil, fmt.Errorf("%v 文件: %v", f.Name(), errs)
	}
	err = vars.YMLReplaceVar(y, pb.GetRepository().Id)
	if err != nil {
		core.Log.Errorf("getYmlByFile YMLReplaceVar err :%v", err)
		return nil, fmt.Errorf("%v 文件变量替换失败 :%v", f.Name(), err)
	}
	y.YmlContent = string(fl)
	y.JsonContent = string(marshal)
	return y, nil
}

// 手动触发构建(重构),直接用之前构建的yml做构建规划
func (c *taskPreBuild) manualToBuild(pb *runtime.PreBuild, workPath string) (rterr error) {
	if ioex.CheckContext(c.ctx) {
		return nil
	}
	tpv := &pipeline.TPipelineVersion{}
	yl := &models.YML{}
	defer func() {
		if rterr != nil {
			if tpv != nil && tpv.Id != "" {
				err := updatePipelineVersion(tpv.Id, common.PIPELINE_VERSION_STATUS_ERROR, rterr.Error())
				if err != nil {
					core.Log.Errorf("saveOrUpdatePipeline updatePipelineVersion err: %v", err)
				}
				updateBuild(tpv.Id, common.PIPELINE_VERSION_STATUS_ERROR)
			}
			err := sendPipelineFailedNotice(pb, yl, rterr)
			if err != nil {
				core.Log.Errorf("saveOrUpdatePipeline  sendPipelineFailedNotice err: %v", err)
			}
		}
	}()
	core.Log.Debugf("manualToBuild repoName :%v ", pb.GetRepository().Name)
	db := comm.DBMain.GetDB()
	get, err := db.Where("id = ?", pb.Info.PipelineVersionId).Get(tpv)
	if err != nil {
		return err
	}
	if !get {
		return errors.New("未找到流水线！")
	}
	if tpv.JsonContent == "" {
		return errors.New("流水线配置为空！")
	}
	p := &models.Pipeline{}
	err = json.Unmarshal([]byte(tpv.JsonContent), p)
	if err != nil {
		return errors.New(fmt.Sprintf("getYmlsByManual yaml err: %v", err))
	}
	yl.Pie = p
	if p.Name == "" {
		return errors.New("流水线Name不能为空")
	}
	err = vars.YMLReplaceVar(yl, pb.GetRepository().Id)
	if err != nil {
		return errors.New(fmt.Sprintf("getYmlsByManual YMLReplaceVar err: %v", err))
	}
	_, err = yl.Marshal()
	if err != nil {
		return errors.New("yml 文件解析失败")
	}
	err = c.getPlugins(yl, tpv.YmlContent)
	if err != nil {
		return err
	}
	build, err := c.saveBuild(pb, yl, tpv)
	if err != nil {
		return err
	}
	err = c.putRuntimeBuild(pb, yl, build, tpv, workPath)
	if err != nil {
		return err
	}
	err = updatePipelineVersion(tpv.Id, common.PIPELINE_VERSION_STATUS_OK)
	if err != nil {
		return err
	}
	return nil
}
func updateBuild(tvpId, status string) {
	if tvpId == "" {
		return
	}
	db := comm.DBMain.GetDB()
	tb := &pipeline.TBuild{
		Status: status,
	}
	ts := &pipeline.TStage{
		Status: status,
	}
	tj := &pipeline.TJob{
		Status: status,
	}
	_, err := db.Where("pipeline_version_id = ?", tvpId).Cols("status,error").Update(ts)
	_, err = db.Where("pipeline_version_id = ?", tvpId).Cols("status,error").Update(tj)
	_, err = db.Where("pipeline_version_id = ?", tvpId).Cols("status,error").Update(tb)
	if err != nil {
		core.Log.Errorf("updateBuild db err :%v", err)
	}
}
func sendNotice(openid string, d *notice.NoticeDetail, n *notice.Notice) error {
	core.Log.Debugf("send msg openid:%v  content:%v", openid, n.Content)
	marshal, err := d.Marshal()
	if err != nil {
		return err
	}
	n.Infos = marshal
	err = service.InsertNoticesByRepoOpenid(openid, n)
	return err
}
func sendRepoFailedNotice(pb *runtime.PreBuild) error {
	err := sendNotice(pb.GetRepository().RepoOpenid, &notice.NoticeDetail{
		Version:           "",
		Branch:            pb.GetRepository().Branch,
		FullName:          pb.GetRepository().FullName,
		Result:            notice.BUILD_RESULT_FAILED,
		Msg:               fmt.Sprintf(notice.REPO_FAILED, pb.GetRepository().Name),
		MStatus:           common.PIPELINE_VERSION_STATUS_ERROR,
		PipelineVersionId: pb.Info.PipelineVersionId,
		Openid:            pb.GetRepository().RepoOpenid,
		VersionStatus:     common.PIPELINE_VERSION_STATUS_ERROR,
	}, &notice.Notice{
		Content: fmt.Sprintf(notice.REPO_FAILED, pb.GetRepository().Name),
		Types:   common.MSG_TYPES_SYSYTEM,
	})
	if err != nil {
		return err
	}
	return nil
}
func sendPipelineFailedNotice(pb *runtime.PreBuild, yl *models.YML, errs error) error {
	err := sendNotice(pb.GetRepository().RepoOpenid, &notice.NoticeDetail{
		Version:           "",
		Branch:            fmt.Sprintf("%v(%v)", pb.GetRepository().Branch, pb.GetRepository().Sha),
		FullName:          pb.GetRepository().FullName,
		Result:            notice.BUILD_RESULT_FAILED,
		Msg:               fmt.Sprintf(notice.BUILD_FAILED_ERR, yl.Pie.Name, errs),
		PipelineVersionId: pb.Info.PipelineVersionId,
		RepoId:            pb.GetRepository().Id,
		Openid:            pb.GetRepository().RepoOpenid,
		VersionStatus:     common.PIPELINE_VERSION_STATUS_ERROR,
		MStatus:           common.PIPELINE_VERSION_STATUS_ERROR,
	}, &notice.Notice{
		Content: fmt.Sprintf(notice.BUILD_FAILED, yl.Pie.Name),
		Types:   common.MSG_TYPES_SYSYTEM,
	})
	if err != nil {
		return err
	}
	return nil
}

// yml解析错误,发送站内信通知
func sendYmlFailedNotice(pb *runtime.PreBuild, errs error) error {
	err := sendNotice(pb.GetRepository().RepoOpenid, &notice.NoticeDetail{
		Version:           "",
		Branch:            fmt.Sprintf("%v(%v)", pb.GetRepository().Branch, pb.GetRepository().Sha),
		FullName:          pb.GetRepository().FullName,
		Result:            notice.BUILD_RESULT_FAILED,
		Msg:               fmt.Sprintf(notice.BUILD_FAILED_ERR, "", errs),
		PipelineVersionId: pb.Info.PipelineVersionId,
		RepoId:            pb.GetRepository().Id,
		Openid:            pb.GetRepository().RepoOpenid,
		MStatus:           common.PIPELINE_VERSION_STATUS_ERROR,
		VersionStatus:     common.PIPELINE_VERSION_STATUS_OK,
	}, &notice.Notice{
		Content: fmt.Sprintf(notice.YML_FAILED, pb.GetRepository().Name),
		Types:   common.MSG_TYPES_SYSYTEM,
	})
	if err != nil {
		return err
	}
	return nil
}

func updatePipelineVersion(pipelineVersionId, status string, errs ...string) error {
	ts := time.Now()
	defer func() {
		core.Log.Debugf("updatePipelineVersion end time:%.4fs", time.Since(ts).Seconds())
	}()
	if pipelineVersionId == "" || status == "" {
		return errors.New("updatePipelineVersion param is empty")
	}
	tpv := &pipeline.TPipelineVersion{}
	tpv.Id = pipelineVersionId
	tpv.Status = status
	if errs != nil && len(errs) == 1 {
		tpv.Error = errs[0]
	}
	db := comm.DBMain.GetDB()
	_, err := db.Where("id = ?", tpv.Id).Cols("status,error").Update(tpv)
	if err != nil {
		return err
	}
	return nil
}

// clone仓库代码
func (c *taskPreBuild) getRepo(pb *runtime.PreBuild, workPath string) (gr *git.Repository, rterr error) {
	ts := time.Now()
	defer func() {
		core.Log.Debugf("GetRepo end time:%.4fs", time.Since(ts).Seconds())
	}()
	defer func() {
		// 如果拉取代码失败,更新信息,发送站内信
		if rterr != nil {
			if pb.TriggerType != common.TRIGGER_TYPE_WEBHOOK && pb.Info.PipelineVersionId != "" {
				err := updatePipelineVersion(pb.Info.PipelineVersionId, common.PIPELINE_VERSION_STATUS_ERROR, rterr.Error())
				if err != nil {
					core.Log.Errorf("getRepo updatePipelineVersion err:%v", err)
				}
			}
			err := sendRepoFailedNotice(pb)
			if err != nil {
				core.Log.Errorf("getRepo sendRepoFailedNotice err:%v", err)
			}
		}
	}()
	core.Log.Infof("git clone start repoName : %v", pb.GetRepository().Name)
	if workPath == "" || len(workPath) == 0 {
		core.Log.Error("git clone err :workPath is empty")
		return nil, errors.New("workPath is empty")
	}
	db := comm.DBMain.GetDB()
	ut := &pipeline.TUserToken{}
	get, err := db.Where("name =? ", pb.Info.User.UserName).Cols("name,access_token").Get(ut)
	if err != nil {
		core.Log.Errorf("getRepo db err :%v", err)
		return nil, err
	}
	basicAuth := &ghttp.BasicAuth{}
	if get {
		basicAuth.Username = ut.Name
		basicAuth.Password = ut.AccessToken
	}
	gc := &git.CloneOptions{
		URL:           pb.GetRepository().CloneURL,
		Auth:          basicAuth,
		ReferenceName: plumbing.ReferenceName(pb.GetRepository().Ref),
	}
	core.Log.Infof("git clone repoUrl : %v", gc.URL)
	repository, err := gitex.CloneRepo(workPath, gc, c.ctx)
	if err != nil {
		core.Log.Errorf("git CloneRepo err: %v", err)
		return nil, err
	}
	// 如果指定了仓库信息(sha,branch,username,pass,token),需要checkout,并记录下来
	if pb.GetRepository().Sha == "" {
		ref, err := repository.Head()
		if err != nil {
			core.Log.Errorf("git clone err: %v", err)
			return nil, err
		}
		cIter, err := gitex.GetLogs(repository, &git.LogOptions{From: ref.Hash()})
		if err != nil {
			core.Log.Errorf("git GetLogs GetLogs err: %v", err)
			return nil, err
		}
		next, err := cIter.Next()
		if err != nil {
			core.Log.Errorf("git GetLogs Sha err: %v", err)
			return nil, err
		}
		pb.GetRepository().Sha = next.Hash.String()
	}
	// checkout到某个hash或branch,tag
	err = gitex.CheckOutHash(repository, pb.GetRepository().Sha)
	if err != nil {
		core.Log.Errorf("git CheckOutHash err: %v", err)
		return nil, err
	}
	// 如果有msg,记录下来,前端展示要用
	if pb.Info.CommitMessage == "" {
		cIter, err := gitex.GetLogs(repository, &git.LogOptions{From: plumbing.NewHash(pb.GetRepository().Sha)})
		if cIter == nil {
			return repository, nil
		}
		next, err := cIter.Next()
		if err != nil {
			core.Log.Errorf("git GetLogs Message err: %v", err)
			return nil, err
		}
		pb.Info.CommitMessage = next.Message
	}

	if pb.Info.Events == common.EVENTS_TYPE_PR {
		//TODO merge
	}
	core.Log.Debugf("git clone end repoName : %v", pb.GetRepository().Name)
	return repository, nil
}
func (c *taskPreBuild) repoToZip(repoPath string, buildId string) (string, error) {
	ts := time.Now()
	defer func() {
		core.Log.Debugf("RepoToZip end time:%.4fs", time.Since(ts).Seconds())
	}()
	core.Log.Debugf("RepoToZip path:%v", repoPath)
	dst := filepath.Join(comm.WorkPath, comm.Build, buildId, "repo.zip")
	return repo.RepoToZip(repoPath, dst)
}
func (c *taskPreBuild) removeRepo(path string) error {
	defer func() {
		if err := recover(); err != nil {
			core.LogPnc.Errorf("RemoveRepo : %v", err)
			core.LogPnc.Errorf("%s", string(debug.Stack()))
		}
	}()
	core.Log.Debug("RemoveRepo path:", fmt.Sprintf("%+v", path))
	err := os.RemoveAll(path)
	if err != nil {
		core.Log.Error(err)
		return err
	}
	return nil
}

// 保存信息
func (c *taskPreBuild) saveOrUpdatePipelineVersion(pb *runtime.PreBuild, yml *models.YML, pie *pipeline.TPipeline) (t *pipeline.TPipelineVersion, rterr error) {
	db := comm.DBMain.GetDB()
	rp := pb.GetRepository()
	tpv := &pipeline.TPipelineVersion{
		Id:                  pb.Info.PipelineVersionId,
		Trigger:             pb.TriggerType,
		Events:              pb.Info.Events,
		Ref:                 pb.GetRepository().Ref,
		Branch:              pb.GetRepository().Branch,
		RepoCloneUrl:        pb.GetRepository().CloneURL,
		CommitSha:           pb.GetRepository().Sha,
		RepoId:              rp.Id,
		RepoName:            rp.Name,
		CommitMessage:       pb.Info.CommitMessage,
		PipelineName:        pie.Name,
		PipelineDisplayName: pie.DisplayName,
		PipelineId:          pie.Id,
		Version:             yml.Pie.Version,
		YmlContent:          yml.YmlContent,
		JsonContent:         yml.JsonContent,
		Note:                pb.Info.Note,
		Title:               pb.Info.Title,
		PrNumber:            pb.Info.PrNumber,
		Status:              common.PIPELINE_VERSION_STATUS_RUNNING,
	}
	if pb.Info.TargetRepository != nil {
		tpv.TargetRepoName = pb.Info.TargetRepository.Name
		tpv.TargetRepoSha = pb.Info.TargetRepository.Sha
		tpv.TargetRepoRef = pb.Info.TargetRepository.Ref
		tpv.TargetRepoCloneUrl = pb.Info.TargetRepository.CloneURL
	}
	if pb.Info.Action != "" {
		tpv.Events = pb.Info.Action
	}
	ut := &pipeline.TUserToken{}
	get, err := db.Where("name = ? ", pb.Info.User.UserName).Get(ut)
	if err != nil {
		return nil, err
	}
	tpv.CreateUser = ut.Nick
	tpv.CreateUserId = strconv.FormatInt(ut.Uid, 10)
	if ut.Nick == "" {
		tpv.CreateUser = ut.Name
	}
	if !get && pb.Info.User.Id != 0 {
		tu := &pipeline.TUser{}
		_, err = db.Where("id = ?", pb.Info.User.Id).Get(tu)
		if err != nil {
			return nil, err
		}
		tpv.CreateUser = tu.Nick
		tpv.CreateUserId = strconv.FormatInt(tu.Id, 10)
		if tu.Nick == "" {
			tpv.CreateUser = tu.Name
		}
	}
	if pb.Info.PipelineVersionId == "" { // 如果有pipeline version 则更新
		tpv.Id = utils.NewXid()
		tpv.CreateTime = time.Now()
		number := int64(0)
		_, err = db.
			SQL("SELECT max(number) FROM t_pipeline_version WHERE pipeline_id = ?", pie.Id).
			Get(&number)
		tpv.Number = number + 1
		_, err = db.Insert(tpv)
	} else {
		_, err = db.Cols("repo_id,repo_name,commit_sha,commit_message,version"+
			",create_user,create_user_id,note,pipeline_name,pipeline_display_name").Where("id = ?", tpv.Id).Update(tpv)
	}
	if err != nil {
		return nil, err
	}
	return tpv, nil
}

// 保存数据
func saveOrUpdatePipeline(pb *runtime.PreBuild, p *models.Pipeline) (*pipeline.TPipeline, error) {
	pipelinelk.Lock()
	defer pipelinelk.Unlock()
	db := comm.DBMain.GetDB()
	rp := pb.GetRepository()
	pie := &pipeline.TPipeline{}
	get, err := db.Where("name = ? and repo_id = ?", p.Name, rp.Id).Get(pie)
	if err != nil {
		return nil, err
	}
	pie.PipelineType = common.TRIGGER_TYPE_WEBHOOK
	if get {
		pie.DisplayName = p.DisplayName
		_, err = db.Where("id = ?", pie.Id).Cols("display_name").Update(pie)
		if err != nil {
			return nil, err
		}
	} else {
		pie.Id = utils.NewXid()
		pie.Name = p.Name
		pie.RepoId = rp.Id
		pie.DisplayName = p.DisplayName
		_, err = db.Insert(pie)
		if err != nil {
			return nil, err
		}
	}
	tpb := &pipeline.TPipelineBranch{}
	b, err := db.Where("deleted != 1 and pipeline_id = ? and repo_id = ? and name = ?",
		pie.Id, rp.Id, pb.GetRepository().Branch).Get(tpb)
	if err != nil {
		return nil, err
	}
	if !b {
		tpb.Id = utils.NewXid()
		tpb.PipelineId = pie.Id
		tpb.RepoId = rp.Id
		tpb.Name = pb.GetRepository().Branch
		tpb.CreateTime = time.Now()
		_, err = db.Insert(tpb)
		if err != nil {
			return nil, err
		}
	}
	return pie, nil
}

func SkipTriggerRules(pb *runtime.PreBuild, c *models.YML) bool {
	if pb.TriggerType != common.TRIGGER_TYPE_WEBHOOK {
		return true
	}

	if c.Pie.Triggers == nil || len(c.Pie.Triggers) <= 0 {
		core.Log.Error("Triggers is empty")
		return false
	}
	events := ""
	switch pb.Info.Events {
	case common.EVENTS_TYPE_PUSH, common.EVENTS_TYPE_PR:
		events = pb.Info.Events
	case common.EVENTS_TYPE_COMMENT:
		events = common.EVENTS_TYPE_PR
	default:
		core.Log.Errorf("not match action:%v", pb.Info.Events)
		return false
	}
	v, ok := c.Pie.Triggers[events]
	if !ok {
		core.Log.Errorf("not match action: %v", pb.Info.Events)
		return false
	}
	if v == nil {
		core.Log.Errorf("\"%v trigger is empty", pb.Info.Events)
		return false
	}
	// TODO 描述下规则    问产品
	if !skipCommitNotes(v.Notes, pb.Info.Note) {
		return false
	} else if !skipBranch(v.Branches, pb.Info.Repository.Branch) {
		return false
	} else if !skipCommitMessages(v.CommitMessages, pb.Info.CommitMessage) {
		return false
	} else {
		core.Log.Debugf("%v skip", c.Pie.Name)
		return true
	}
}

func skipBranch(c *models.Condition, branch string) bool {
	return !c.Match(branch)
}
func skipCommitMessages(c *models.Condition, branch string) bool {
	return !c.Match(branch)
}
func skipCommitNotes(c *models.Condition, branch string) bool {
	return !c.Match(branch)
}
